2. Spark Core 核心 RDD

RDD

RDD全称叫做弹性分布式数据集(Resilient Distributed Datasets),它是一种分布式的内存抽象,表示一个只读的记录分区的集合,它只能通过其他RDD转换而创建,为此,RDD支持丰富的转换操作(如map, join, filter, groupBy等),通过这种转换操作,新的RDD则包含了如何从其他RDDs衍生所必需的信息,所以说RDDs之间是有依赖关系的。基于RDDs之间的依赖,RDDs会形成一个有向无环图DAG,该DAG描述了整个流式计算的流程,实际执行的时候,RDD是通过血缘关系(Lineage)一气呵成的,即使出现数据分区丢失,也可以通过血缘关系重建分区,总结起来,基于RDD的流式计算任务可描述为:从稳定的物理存储(如分布式文件系统)中加载记录,记录被传入由一组确定性操作构成的DAG,然后写回稳定存储。另外RDD还可以将数据集缓存到内存中,使得在多个操作之间可以重用数据集,基于这个特点可以很方便地构建迭代型应用(图计算、机器学习等)或者交互式数据分析应用。可以说Spark最初也就是实现RDD的一个分布式系统,后面通过不断发展壮大成为现在较为完善的大数据生态系统,简单来讲,Spark-RDD的关系类似于Hadoop-MapReduce关系。

RDD 特性

  1. 由一系列分区(分片)组成
  2. 对一个 RDD 进行操作,其实就是对一个 RDD 中的所有分区进行操作
  3. RDD表示只读的分区的数据集,对RDD进行改动,只能通过RDD的转换操作,由一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必需的信息。RDDs之间存在依赖,RDD的执行是按照血缘关系延时计算的

pyspark

1
2
3
4
5
6
7
8
9
10
11
12
13
vim ~/.bash_profile 

# 写入
export PYSPARK_PYTHON=python3.6
export SPARK_HOME=/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0
export PATH=$SPARK_HOME/bin:$PATH

# 保存后
source ~/.bash_profile

# 启动 pyspark (这样就是 Python 3.6 的版本了,而不是 Python2)
cd /home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0/bin
./pyspark

创建 RDD

将简单的 Python 数据转为 RDD

1
2
3
4
5
6
7
8
9
>>> data = [1, 2, 3, 4, 5]
>>> distData = sc.parallelize(data) # 这样就创建了一个 RDD。还可以加上第二个参数,表示分区的数量
>>> distData.collect() # 查看整个 RDD,同时可以在网页http://192.168.11.111:4040/stages/ 上看到任务
>>> distData.reduce(lambda a,b:a+b) # 使用 reduce 进行操作

# 将一个 RDD 切割成为很多份
>>> data = [1, 2, 3, 4, 5]
>>> distData = sc.parallelize(data, 5) # 切割为 5 份(分区/分片)
>>> distData.collect() # 查看结果,显示的还是 [1, 2, 3, 4, 5],但是我们查看网页,就会发现 Tasks 的数量为 5 个了。典型的设置是一个 cpu 设置 2~4 个分区,即最大 1:4

外部存储数据转换为 RDD

格式只要是 Hadoop 支持的,这里都支持。例如 本地文件系统,HDFS,Cassnadra,HBase,Amazon S3 ,文件文件,二进制文件等。

  • 将本地文件转为 RDD
1
2
>>> sc.textFile("file:///home/hadoop/data/hello.txt").collect()
['hello spark', 'hello pyspark']

除了数据文件路径外,我们还可以传递一个目录,或者进行模式匹配,例如 textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz") textFile 和 parallelize 一样也支持第二个参数来进行分区,默认情况下,Spark为文件的每个块创建一个分区(HDFS中默认为128MB),也可以通过传递更大的值来请求更多的分区

  • hdfs
1
2
3
在命令行中,将刚刚创建的 hello.txt 推送到 hdfs 中去
hadoop fs -put hello.txt / # 推送
hadoop fs -text /hello.txt # 读取
1
2
>>> sc.textFile("hdfs://192.168.11.111:8020/hello.txt").collect()
['hello spark', 'hello pyspark']

开发 pyspark 应用程序

打开 Pychram 设置环境变量


内容为 spark-2.3.0-bin-2.6.0-cdh5.7.0 包中的 python 的目录和 python 的上一级目录

将我们制作的 spark-2.3.0-bin-2.6.0-cdh5.7.0 包中的 python/lib下的 py4j-0.10.6-src.zip 和 pyspark.zip 选中导入。这样就不需要安装 pyspark 和 py4j 包了。

编写测试程序

1
2
3
4
5
6
7
8
9
10
11
12
13
from pyspark import SparkContext, SparkConf

# 创建 SparkConf, 设置 Spark 相关参数信息
conf = SparkConf().setAppName("spark").setMaster("local[2]")

# 创建 SparkContext
sc = SparkContext(conf=conf)

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
print(distData.collect())

sc.stop()

在服务器环境中运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
mkdir ~/script
vim /home/hadoop/script/spark_test.py

# 内容如下
from pyspark import SparkContext, SparkConf

conf = SparkConf()

sc = SparkContext(conf=conf)

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
print(distData.collect())

sc.stop()

# 提交 pyspark 应用程序
cd /home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0/bin
./spark-submit --master local[2] --name spark_test /home/hadoop/script/spark_test.py